Skip to main content

Event-Driven Architecture

The Surprising Part: A Single Slow Service Can Bring Down Your Entire Platform

# SYNCHRONOUS CALL CHAIN - the silent killer
# Upload Service → Classification Service → Analytics Service → Email Service

import httpx
import time

async def process_upload_synchronous(doc_id: str, text: str):
"""
This looks clean. It will cause production incidents.
"""
async with httpx.AsyncClient(timeout=30.0) as client:

# Step 1: Classify (Classification Service - usually fast, 200ms)
t1 = time.perf_counter()
classify_resp = await client.post(
"http://classification-service/classify",
json={"text": text, "doc_id": doc_id},
)
classify_resp.raise_for_status()
label = classify_resp.json()["label"]
print(f"Classification: {(time.perf_counter()-t1)*1000:.0f}ms")

# Step 2: Record analytics (Analytics Service - usually fast, 100ms)
t2 = time.perf_counter()
analytics_resp = await client.post(
"http://analytics-service/record",
json={"doc_id": doc_id, "label": label, "event": "classified"},
)
analytics_resp.raise_for_status()
print(f"Analytics: {(time.perf_counter()-t2)*1000:.0f}ms")

# Step 3: Send email (Email Service - network call to SendGrid, 500ms–5s)
t3 = time.perf_counter()
email_resp = await client.post(
"http://email-service/send",
json={"doc_id": doc_id, "label": label},
)
email_resp.raise_for_status()
print(f"Email: {(time.perf_counter()-t3)*1000:.0f}ms")

return {"doc_id": doc_id, "label": label}

# FAILURE SCENARIO:
# It is 11 PM on a Friday. SendGrid has a 30-second degradation incident.
# Step 3 takes 30 seconds instead of 500ms.
# Upload Service response time: 30+ seconds.
# Users think uploads are broken.
# Support tickets flood in.
# On-call engineer is woken up.
# Root cause: EMAIL SERVICE is slow, but UPLOAD SERVICE bears the blast radius.
#
# The upload is done. The classification is done. The user just wants their document.
# Why should the upload fail because an email provider is slow?

This is temporal coupling - your service's availability is tied to the availability of every service it calls. Event-driven architecture breaks this coupling.

What You Will Learn

  • Why Event-Driven - temporal coupling, the cascade failure problem, loose coupling benefits
  • Message Queue Fundamentals - point-to-point vs pub/sub, delivery guarantees, consumer groups
  • Kafka with Python - producers, consumers, consumer groups, offset management, real production example
  • Redis Streams - lightweight alternative for simpler event flows
  • Event Sourcing - storing events instead of state, rebuilding aggregates, snapshots
  • CQRS - separating write and read models, updating read models from events
  • Saga Pattern - distributed transactions, choreography vs orchestration, compensating transactions

Prerequisites: Python async/await, Docker (for Kafka), basic understanding of queues.

Part 1: Why Event-Driven?

Before: Synchronous Chain

Upload Request


Upload Service ──────────────────────────────► Classification Service
(waits 200ms) (processes, replies)


Upload Service ──────────────────────────────► Analytics Service
(waits 100ms) (records, replies)


Upload Service ──────────────────────────────► Email Service
(waits 500ms–30s!) (sends email, or HANGS)


Response to user: 30+ seconds, or timeout error

After: Event-Driven Async

Upload Request


Upload Service ──► Store file ──► Publish "doc.uploaded" event ──► Response to user (50ms)


┌───────────────┼───────────────┐
▼ ▼ ▼
Classification Analytics Email Service
Service Service (independent,
(subscribes to (subscribes to may take 30s,
"doc.uploaded") "doc.uploaded") does not block
│ │ the user)
│ │
Publishes Publishes
"doc.classified" "analytics.recorded"

The upload completes in 50ms instead of 30 seconds. If Email Service is down, documents still upload and get classified - emails catch up when Email Service recovers.

The Three Decouplings

TypeSynchronousEvent-Driven
TemporalBoth must be available simultaneouslyProducer and consumer are independent in time
SpatialProducer knows consumer's addressProducer knows only the topic name
LogicalProducer depends on consumer's APIProducer depends on event schema

Part 2: Message Queue Fundamentals

Point-to-Point vs Pub/Sub

POINT-TO-POINT (Queue) PUB/SUB (Topic)
───────────────────── ────────────────

Producer ─► [Queue] ─► Consumer A Producer ─► [Topic]

┌────────┼────────┐
▼ ▼ ▼
Consumer A Consumer B Consumer C

• Each message consumed by exactly ONE consumer • Each subscriber gets ALL messages
• Work distribution (load balancing) • Fan-out (broadcast)
• Use for: task queues, job processing • Use for: event notification, audit logs
• Example: upload jobs across OCR workers • Example: "order placed" to inventory + email + analytics

Delivery Guarantees

GuaranteeDefinitionRiskUse When
At-most-onceMessage delivered 0 or 1 timesCan be lostMetrics, telemetry (loss is acceptable)
At-least-onceMessage delivered 1 or more timesCan be duplicatedMost business events (design for idempotency)
Exactly-onceDelivered exactly onceMost complex, lowest throughputFinancial transactions, billing

The practical default: design for at-least-once delivery and make your consumers idempotent. Exactly-once is expensive and rarely worth it outside finance.

Idempotent Consumers

# An idempotent consumer processes the same message multiple times
# without producing different results after the first processing

async def handle_document_uploaded(event: dict, db: AsyncSession):
doc_id = event["doc_id"]

# Check if we already processed this event
# Use the event_id as an idempotency key
existing = await db.execute(
select(ProcessingRecord).where(ProcessingRecord.event_id == event["event_id"])
)
if existing.scalar():
logger.info(f"Skipping duplicate event for doc {doc_id}")
return # Already processed - skip safely

# Process the document
result = await ocr_engine.extract_text(doc_id)

# Record that we processed this event
db.add(ProcessingRecord(
event_id=event["event_id"],
doc_id=doc_id,
processed_at=datetime.utcnow(),
))
await db.commit()
logger.info(f"Processed doc {doc_id} from event {event['event_id']}")

Part 3: Kafka with Python

Core Kafka Concepts

Kafka Topic: "doc.uploaded"

┌─────────────┼─────────────┐
▼ ▼ ▼
Partition 0 Partition 1 Partition 2
[msg0][msg3] [msg1][msg4] [msg2][msg5]
│ │ │
offset offset offset

• Topic: named feed of events (like a database table)
• Partition: ordered log within a topic (enables parallelism)
• Offset: position of a message within a partition (like a cursor)
• Consumer Group: group of consumers that share a topic's partitions
- Each partition assigned to exactly ONE consumer in the group
- Adding consumers up to partition count increases parallelism
- Adding more consumers than partitions does nothing

Producer

# processing_service/kafka/producer.py
import json
import asyncio
from confluent_kafka import Producer, KafkaError
from datetime import datetime, timezone
import uuid
import logging

logger = logging.getLogger("processing_service.producer")

class EventProducer:
"""
Kafka event producer with delivery confirmation.
Uses confluent-kafka (C-based, higher performance than kafka-python).
"""

def __init__(self, bootstrap_servers: str = "localhost:9092"):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all", # Wait for all replicas to confirm
"retries": 5, # Retry on transient failures
"retry.backoff.ms": 500,
"compression.type": "snappy", # Compress messages (saves bandwidth)
"batch.size": 65536, # Batch up to 64 KB before sending
"linger.ms": 5, # Wait up to 5ms to batch more messages
"enable.idempotence": True, # Prevent duplicate messages on retry
})
self._loop = asyncio.get_event_loop()

def _delivery_callback(self, err, msg):
"""Called by confluent-kafka when delivery is confirmed or failed."""
if err:
logger.error(
f"Message delivery failed: {err}",
extra={"topic": msg.topic(), "partition": msg.partition()},
)
else:
logger.debug(
f"Message delivered: topic={msg.topic()} "
f"partition={msg.partition()} offset={msg.offset()}"
)

async def publish(self, topic: str, event: dict, key: str | None = None) -> None:
"""
Publish an event to a Kafka topic.
Adds standard envelope fields: event_id, timestamp, schema_version.
"""
envelope = {
"event_id": str(uuid.uuid4()),
"schema_version": "1.0",
"published_at": datetime.now(timezone.utc).isoformat(),
**event,
}

# confluent-kafka producer is synchronous - wrap in thread for asyncio
await self._loop.run_in_executor(
None,
lambda: self._producer.produce(
topic=topic,
key=key.encode("utf-8") if key else None,
value=json.dumps(envelope).encode("utf-8"),
on_delivery=self._delivery_callback,
),
)
# Trigger delivery (flush pending messages) - non-blocking
self._producer.poll(0)

async def flush(self, timeout: float = 30.0) -> None:
"""Wait until all messages are delivered. Call before shutdown."""
await self._loop.run_in_executor(
None,
lambda: self._producer.flush(timeout=timeout),
)

async def close(self) -> None:
await self.flush()
logger.info("Kafka producer closed")

# Usage in Upload Service route
async def upload_document_and_publish(
file_bytes: bytes,
doc_id: str,
user_id: str,
producer: EventProducer,
) -> None:
# Store file...
storage_key = await storage.put(f"raw/{doc_id}", file_bytes)

# Publish event - non-blocking, returns immediately
await producer.publish(
topic="doc.uploaded",
key=doc_id, # Keyed by doc_id → same doc always goes to same partition (ordering)
event={
"doc_id": doc_id,
"user_id": user_id,
"storage_key": storage_key,
"filename": "invoice.pdf",
"size_bytes": len(file_bytes),
"mime_type": "application/pdf",
},
)
logger.info(f"Published doc.uploaded event for doc {doc_id}")

Consumer with Consumer Groups

# processing_service/kafka/consumer.py
import json
import asyncio
from confluent_kafka import Consumer, KafkaError, TopicPartition
import logging
import signal
from typing import Callable, Awaitable

logger = logging.getLogger("processing_service.consumer")

class EventConsumer:
"""
Kafka consumer with consumer group support.
Auto-commits offsets only after successful processing (at-least-once semantics).
"""

def __init__(
self,
bootstrap_servers: str,
group_id: str,
topics: list[str],
):
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest", # Start from beginning if no committed offset
"enable.auto.commit": False, # We commit manually (after processing)
"max.poll.interval.ms": 300000, # 5 minutes - max time between polls
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
})
self._consumer.subscribe(topics)
self._running = False

async def consume(
self,
handler: Callable[[dict], Awaitable[None]],
batch_size: int = 10,
poll_timeout: float = 1.0,
) -> None:
"""
Consume messages and call handler for each.
Commits offsets only after successful handler execution.
"""
self._running = True
loop = asyncio.get_event_loop()

logger.info(f"Consumer started, group={self._consumer.memberid()}")

while self._running:
# Poll is synchronous - run in thread pool
messages = await loop.run_in_executor(
None,
lambda: self._consumer.consume(num_messages=batch_size, timeout=poll_timeout),
)

if not messages:
continue

for message in messages:
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
# Normal - reached end of partition
continue
logger.error(f"Consumer error: {message.error()}")
continue

try:
event = json.loads(message.value().decode("utf-8"))
logger.info(
f"Processing event {event.get('event_id', 'unknown')}",
extra={
"topic": message.topic(),
"partition": message.partition(),
"offset": message.offset(),
},
)

# Call the event handler
await handler(event)

# Commit offset ONLY after successful processing
# This means: if processing fails, we'll retry this message
await loop.run_in_executor(
None,
lambda: self._consumer.commit(message=message),
)

except json.JSONDecodeError as exc:
logger.error(f"Invalid JSON in message: {exc}", extra={"raw": message.value()})
# Dead-letter queue: send malformed messages somewhere
await self._send_to_dlq(message)
await loop.run_in_executor(None, lambda: self._consumer.commit(message=message))

except Exception as exc:
logger.error(f"Handler failed for event: {exc}", exc_info=True)
# Don't commit - this message will be redelivered after timeout
# In production: implement retry count tracking and DLQ after N failures

async def _send_to_dlq(self, message) -> None:
"""Send unparseable or permanently failed messages to a dead-letter queue."""
logger.warning(
f"Sending message to DLQ: {message.topic()}/{message.partition()}/{message.offset()}"
)
# In production: publish to a "topic.dlq" topic with metadata

def stop(self) -> None:
self._running = False
self._consumer.close()

# Wiring it all together
async def run_processing_consumer():
consumer = EventConsumer(
bootstrap_servers="localhost:9092",
group_id="processing-service",
topics=["doc.uploaded"],
)

async def handle_doc_uploaded(event: dict) -> None:
doc_id = event["doc_id"]
storage_key = event["storage_key"]

logger.info(f"Processing document {doc_id}")

# Download from storage
file_bytes = await storage.get(storage_key)

# Run OCR
text = await asyncio.get_event_loop().run_in_executor(
None, ocr_engine.extract_text, file_bytes
)

# Publish next event in the chain
await producer.publish(
topic="doc.processed",
key=doc_id,
event={
"doc_id": doc_id,
"user_id": event["user_id"],
"storage_key": storage_key,
"text": text[:10000], # Truncate for message size
"text_length": len(text),
"processing_source": "processing-service-v2",
},
)

# Graceful shutdown on SIGTERM
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, consumer.stop)

await consumer.consume(handle_doc_uploaded)

if __name__ == "__main__":
asyncio.run(run_processing_consumer())

The Full Event Flow Across Services

doc.uploaded
├── Processing Service (consumer group: processing-service)
│ Publishes: doc.processed
└── Analytics Service (consumer group: analytics-service)
Records upload metric

doc.processed
├── Classification Service (consumer group: classification-service)
│ Publishes: doc.classified
└── Processing Service (consumer group: processing-service-audit)
Writes processing audit log

doc.classified
├── Notification Service (consumer group: notification-service)
│ Sends email to user
├── Analytics Service (consumer group: analytics-service)
│ Updates classification statistics
└── Search Indexer (consumer group: search-indexer)
Indexes document for full-text search

Each consumer group maintains its own offset - adding the Search Indexer later does not require any changes to other services or any re-publishing of events (Kafka retains messages for the configured retention period).

Part 4: Redis Streams

Redis Streams is a lighter-weight alternative to Kafka for lower throughput use cases.

# notification_service/redis_streams.py
import redis.asyncio as redis
import json
import asyncio
import logging
from datetime import datetime

logger = logging.getLogger("notification_service.streams")

class RedisStreamProducer:
"""Publish events to Redis Streams - lighter than Kafka for < 10k events/sec."""

def __init__(self, redis_url: str = "redis://localhost:6379"):
self._redis = redis.from_url(redis_url, decode_responses=True)

async def publish(self, stream: str, event: dict, maxlen: int = 100_000) -> str:
"""
XADD: Add entry to stream.
Returns the message ID (timestamp-sequence).
maxlen: trim stream to this length (MAXLEN ~ approximate for performance).
"""
event_with_meta = {
"event_id": f"evt_{datetime.utcnow().timestamp():.0f}",
**{k: str(v) for k, v in event.items()}, # Redis stores strings
}
# XADD stream_name * field1 value1 field2 value2
# "*" means Redis auto-generates the ID (recommended)
message_id = await self._redis.xadd(
stream,
event_with_meta,
maxlen=maxlen,
approximate=True, # MAXLEN ~ for performance (avoids O(n) trimming)
)
logger.debug(f"Published to stream {stream}, id={message_id}")
return message_id

class RedisStreamConsumer:
"""
Consume from Redis Streams with consumer group support.
Provides at-least-once delivery via acknowledgement.
"""

def __init__(
self,
redis_url: str,
stream: str,
group_name: str,
consumer_name: str,
):
self._redis = redis.from_url(redis_url, decode_responses=True)
self._stream = stream
self._group = group_name
self._consumer = consumer_name

async def setup(self) -> None:
"""Create the consumer group if it doesn't exist."""
try:
# XGROUP CREATE stream group $ MKSTREAM
# "$" means: only consume new messages (not historical)
# "0" means: consume from the beginning
await self._redis.xgroup_create(
self._stream,
self._group,
id="$", # Start from new messages only
mkstream=True,
)
logger.info(f"Created consumer group {self._group} on stream {self._stream}")
except redis.ResponseError as e:
if "BUSYGROUP" in str(e):
logger.info(f"Consumer group {self._group} already exists")
else:
raise

async def consume(self, handler, batch_size: int = 10, block_ms: int = 2000) -> None:
"""
XREADGROUP: Read messages from the consumer group.
block_ms: block for this long if no messages available (long-polling).
"""
logger.info(f"Consumer {self._consumer} started on stream {self._stream}")

while True:
# XREADGROUP GROUP group consumer COUNT batch_size BLOCK block_ms STREAMS stream >
# ">" means: messages not yet delivered to any consumer in this group
messages = await self._redis.xreadgroup(
groupname=self._group,
consumername=self._consumer,
streams={self._stream: ">"},
count=batch_size,
block=block_ms,
)

if not messages:
continue # Timeout - loop back to poll

for stream_name, stream_messages in messages:
for message_id, fields in stream_messages:
try:
# Convert string fields back to appropriate types
event = dict(fields)
await handler(event)

# XACK: Acknowledge successful processing
# Without XACK, message stays in "Pending Entries List" (PEL)
# and will be redelivered
await self._redis.xack(self._stream, self._group, message_id)

except Exception as exc:
logger.error(
f"Failed to process message {message_id}: {exc}",
exc_info=True,
)
# Don't XACK - message stays in PEL and will be redelivered
# In production: track retry count and move to DLQ after N failures

async def claim_pending(self, min_idle_ms: int = 60_000) -> list:
"""
Reclaim messages that were delivered but not acknowledged within min_idle_ms.
Run periodically to handle crashed consumers.
XAUTOCLAIM: Transfers ownership of idle messages to this consumer.
"""
result = await self._redis.xautoclaim(
self._stream,
self._group,
self._consumer,
min_idle_time=min_idle_ms,
start_id="0-0",
count=100,
)
reclaimed = result[1] if result else []
if reclaimed:
logger.info(f"Reclaimed {len(reclaimed)} idle messages")
return reclaimed

# Usage
async def run_notification_consumer():
producer = RedisStreamProducer("redis://localhost:6379")
consumer = RedisStreamConsumer(
redis_url="redis://localhost:6379",
stream="doc.classified",
group_name="notification-service",
consumer_name="notification-worker-1",
)
await consumer.setup()

async def handle_classified(event: dict) -> None:
doc_id = event["doc_id"]
label = event["label"]
user_id = event["user_id"]

await email_sender.send(
to=await user_service.get_email(user_id),
subject="Your document is ready",
body=f"Document classified as: {label}",
)
logger.info(f"Sent notification for doc {doc_id} to user {user_id}")

await consumer.consume(handle_classified)

Kafka vs Redis Streams - When to Choose

FactorRedis StreamsKafka
Throughput< 100k msg/sec> 1M msg/sec
Message retentionRAM-limited (or RDB/AOF)Disk-based, configurable (days/weeks)
Setup complexityLow (if Redis already in stack)High (ZooKeeper or KRaft, cluster)
Replay from beginningYes (up to retention limit)Yes (for configured retention period)
Consumer groupsYes (XGROUP)Yes (built-in, more mature)
Operational overheadLowHigh
When to chooseSimple event flows, already using RedisHigh-throughput, long retention, audit logs

Part 5: Event Sourcing

Storing Events, Not State

Traditional persistence: you store the current state of an entity. Event sourcing: you store the sequence of events that led to the current state.

# Traditional approach: store current state
# UPDATE documents SET status='classified', label='invoice' WHERE id='doc-001'
# You lose: when was it classified? By what model version? What was the previous label?

# Event sourcing: store every event
# INSERT INTO events (aggregate_id, event_type, data, timestamp) VALUES (...)
# You gain: full audit history, time travel, rebuild from any point

# event_store.py
from sqlalchemy import Column, String, JSON, DateTime, Integer, Text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
import json
from datetime import datetime, timezone
import uuid

class Base(DeclarativeBase):
pass

class StoredEvent(Base):
__tablename__ = "events"

id = Column(Integer, primary_key=True, autoincrement=True)
event_id = Column(String(36), unique=True, nullable=False, default=lambda: str(uuid.uuid4()))
aggregate_id = Column(String(255), nullable=False, index=True)
aggregate_type = Column(String(100), nullable=False)
event_type = Column(String(100), nullable=False)
event_version = Column(Integer, nullable=False, default=1)
sequence_number = Column(Integer, nullable=False) # Per-aggregate ordering
data = Column(JSON, nullable=False)
metadata = Column(JSON, nullable=True)
created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))

class EventStore:
"""
Append-only event store.
Events are immutable - never update or delete them.
"""

def __init__(self, session: AsyncSession):
self._session = session

async def append(
self,
aggregate_id: str,
aggregate_type: str,
events: list[dict],
expected_version: int | None = None,
) -> None:
"""
Append events to the store.
expected_version: optimistic concurrency - fails if aggregate was modified
by another process since we last read it.
"""
# Get current sequence number
from sqlalchemy import select, func
result = await self._session.execute(
select(func.max(StoredEvent.sequence_number)).where(
StoredEvent.aggregate_id == aggregate_id
)
)
current_version = result.scalar() or 0

if expected_version is not None and current_version != expected_version:
raise OptimisticConcurrencyError(
f"Aggregate {aggregate_id} was modified. "
f"Expected version {expected_version}, got {current_version}"
)

for i, event in enumerate(events, start=1):
stored = StoredEvent(
aggregate_id=aggregate_id,
aggregate_type=aggregate_type,
event_type=event["type"],
sequence_number=current_version + i,
data=event["data"],
metadata=event.get("metadata", {}),
)
self._session.add(stored)

await self._session.commit()

async def load(self, aggregate_id: str, since_version: int = 0) -> list[StoredEvent]:
"""Load all events for an aggregate since a given version."""
from sqlalchemy import select
result = await self._session.execute(
select(StoredEvent)
.where(StoredEvent.aggregate_id == aggregate_id)
.where(StoredEvent.sequence_number > since_version)
.order_by(StoredEvent.sequence_number)
)
return result.scalars().all()

class OptimisticConcurrencyError(Exception):
pass

Aggregate: Rebuilding State from Events

# document_aggregate.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class DocumentStatus(str, Enum):
UPLOADED = "uploaded"
PROCESSING = "processing"
PROCESSED = "processed"
CLASSIFIED = "classified"
FAILED = "failed"

@dataclass
class DocumentAggregate:
"""
Document aggregate - state rebuilt from event history.
All state changes go through apply_event() - never direct assignment.
"""
id: str = ""
status: DocumentStatus = DocumentStatus.UPLOADED
storage_key: str = ""
ocr_text: str = ""
label: str = ""
confidence: float = 0.0
model_version: str = ""
owner_id: str = ""
filename: str = ""
version: int = 0
failed_reason: str = ""
events: list[dict] = field(default_factory=list, repr=False)

def apply_event(self, event_type: str, data: dict) -> None:
"""Apply a single event to mutate state."""
self.version += 1

if event_type == "DocumentUploaded":
self.id = data["doc_id"]
self.storage_key = data["storage_key"]
self.owner_id = data["user_id"]
self.filename = data["filename"]
self.status = DocumentStatus.UPLOADED

elif event_type == "DocumentProcessingStarted":
self.status = DocumentStatus.PROCESSING

elif event_type == "DocumentProcessed":
self.ocr_text = data["text"]
self.status = DocumentStatus.PROCESSED

elif event_type == "DocumentClassified":
self.label = data["label"]
self.confidence = data["confidence"]
self.model_version = data["model_version"]
self.status = DocumentStatus.CLASSIFIED

elif event_type == "DocumentProcessingFailed":
self.status = DocumentStatus.FAILED
self.failed_reason = data["reason"]

else:
raise ValueError(f"Unknown event type: {event_type}")

@classmethod
def rebuild(cls, events: list) -> "DocumentAggregate":
"""Rebuild the current state by replaying all events in order."""
aggregate = cls()
for stored_event in events:
aggregate.apply_event(stored_event.event_type, stored_event.data)
return aggregate

def upload(self, doc_id: str, storage_key: str, user_id: str, filename: str) -> None:
"""Command: initiate an upload. Produces an event."""
event = {
"type": "DocumentUploaded",
"data": {
"doc_id": doc_id,
"storage_key": storage_key,
"user_id": user_id,
"filename": filename,
},
}
self.apply_event(event["type"], event["data"])
self.events.append(event) # Events to be saved to event store

def classify(self, label: str, confidence: float, model_version: str) -> None:
if self.status not in (DocumentStatus.PROCESSED, DocumentStatus.UPLOADED):
raise ValueError(f"Cannot classify a document in status {self.status}")

event = {
"type": "DocumentClassified",
"data": {"label": label, "confidence": confidence, "model_version": model_version},
}
self.apply_event(event["type"], event["data"])
self.events.append(event)

# Usage in a command handler
async def handle_upload_command(
doc_id: str, storage_key: str, user_id: str, filename: str,
event_store: EventStore
) -> DocumentAggregate:
aggregate = DocumentAggregate()
aggregate.upload(doc_id, storage_key, user_id, filename)

await event_store.append(
aggregate_id=doc_id,
aggregate_type="Document",
events=aggregate.events,
expected_version=0, # New aggregate - must not exist yet
)

# Publish to Kafka for other services to consume
await kafka_producer.publish("doc.uploaded", {
"doc_id": doc_id, "user_id": user_id, "storage_key": storage_key
})

return aggregate

Snapshots - Avoiding Full Replay for Old Aggregates

# snapshot_store.py
# After 100+ events, rebuilding from scratch is slow.
# Periodically snapshot the aggregate state.

class SnapshotStore:
async def save(self, aggregate: DocumentAggregate) -> None:
from sqlalchemy import insert
await self._session.execute(
insert(Snapshot).values(
aggregate_id=aggregate.id,
aggregate_type="Document",
version=aggregate.version,
state=aggregate.__dict__, # Serialise current state
created_at=datetime.utcnow(),
).on_conflict_do_update(
index_elements=["aggregate_id"],
set_={"version": aggregate.version, "state": aggregate.__dict__},
)
)

async def load(self, aggregate_id: str) -> tuple[DocumentAggregate | None, int]:
"""Returns (snapshot aggregate, version) or (None, 0) if no snapshot."""
snapshot = await self._session.get(Snapshot, aggregate_id)
if not snapshot:
return None, 0
agg = DocumentAggregate(**snapshot.state)
return agg, snapshot.version

# Rebuilding with snapshot optimisation
async def load_document(doc_id: str, event_store: EventStore, snapshot_store: SnapshotStore):
# Try snapshot first
aggregate, snapshot_version = await snapshot_store.load(doc_id)

# Load only events AFTER the snapshot
events_since = await event_store.load(doc_id, since_version=snapshot_version)

if aggregate is None:
aggregate = DocumentAggregate()

# Replay only the events since the snapshot
for event in events_since:
aggregate.apply_event(event.event_type, event.data)

# If we replayed many events, save a new snapshot
if len(events_since) > 50:
await snapshot_store.save(aggregate)

return aggregate

Part 6: CQRS - Command Query Responsibility Segregation

CQRS separates write operations (commands) from read operations (queries). Each has its own model optimised for its purpose.

# WRITE MODEL (normalised, consistent, uses event store)
# Commands → Domain Events → Event Store → Kafka

# READ MODEL (denormalised, fast to query, updated by consuming events)
# Kafka Events → Read Model Updater → PostgreSQL read tables (or Elasticsearch)

# read_model.py - the denormalised table that reads query against

class DocumentReadModel(Base):
__tablename__ = "document_read_model"

id = Column(String(36), primary_key=True)
filename = Column(String(500))
owner_id = Column(String(36), index=True)
status = Column(String(50), index=True)
label = Column(String(100), nullable=True, index=True)
confidence = Column(Float, nullable=True)
model_version = Column(String(50), nullable=True)
storage_key = Column(String(1000))
ocr_text_preview = Column(Text, nullable=True) # First 500 chars for preview
uploaded_at = Column(DateTime)
classified_at = Column(DateTime, nullable=True)
# Denormalized for fast listing
owner_email = Column(String(255), nullable=True)
owner_name = Column(String(255), nullable=True)

# Read Model Updater - consumes events and updates the read model
class ReadModelUpdater:
"""
Consumes domain events from Kafka and updates the denormalised read model.
This is the PROJECTION - turning events into a queryable view.
"""

def __init__(self, session_factory, event_consumer: EventConsumer):
self._session_factory = session_factory
self._consumer = event_consumer

async def run(self) -> None:
await self._consumer.consume(self.handle_event)

async def handle_event(self, event: dict) -> None:
event_type = event.get("type") or event.get("event_type")
async with self._session_factory() as session:
if event_type == "DocumentUploaded" or "doc_id" in event and "storage_key" in event:
await self._handle_uploaded(event, session)
elif event_type == "DocumentClassified" or "label" in event and "confidence" in event:
await self._handle_classified(event, session)
elif event_type == "DocumentProcessingFailed":
await self._handle_failed(event, session)

async def _handle_uploaded(self, event: dict, session: AsyncSession) -> None:
from sqlalchemy.dialects.postgresql import insert as pg_insert
doc = DocumentReadModel(
id=event["doc_id"],
filename=event.get("filename", ""),
owner_id=event["user_id"],
status="uploaded",
storage_key=event["storage_key"],
uploaded_at=datetime.utcnow(),
)
session.add(doc)
await session.commit()

async def _handle_classified(self, event: dict, session: AsyncSession) -> None:
from sqlalchemy import update
await session.execute(
update(DocumentReadModel)
.where(DocumentReadModel.id == event["doc_id"])
.values(
label=event["label"],
confidence=event["confidence"],
model_version=event.get("model_version"),
status="classified",
classified_at=datetime.utcnow(),
)
)
await session.commit()

# Query side - fast reads against the denormalised table
class DocumentQueryService:
def __init__(self, session: AsyncSession):
self._session = session

async def list_user_documents(
self, user_id: str, status: str | None = None, page: int = 1, size: int = 20
) -> dict:
"""Query the read model - no joins, no event replay."""
query = select(DocumentReadModel).where(DocumentReadModel.owner_id == user_id)
if status:
query = query.where(DocumentReadModel.status == status)
query = query.order_by(DocumentReadModel.uploaded_at.desc())
query = query.offset((page - 1) * size).limit(size)

result = await self._session.execute(query)
docs = result.scalars().all()

return {
"documents": [
{
"id": doc.id,
"filename": doc.filename,
"status": doc.status,
"label": doc.label,
"confidence": doc.confidence,
"uploaded_at": doc.uploaded_at.isoformat(),
}
for doc in docs
],
"page": page,
"size": size,
}

Part 7: Saga Pattern - Distributed Transactions

The Problem: You Cannot Have ACID Across Services

PlaceOrder saga:
1. Order Service: create order (status: PENDING)
2. Inventory Service: reserve items
3. Payment Service: charge card
4. Shipping Service: create shipment

If step 3 (Payment) fails after step 2 (Inventory) succeeded,
you have reserved inventory that will never be shipped.
You need to COMPENSATE: unreserve the inventory.

Traditional database transactions don't work across service boundaries.
The Saga pattern solves this with compensating transactions.

Choreography-Based Saga

In choreography, each service knows which event to publish next and which event triggers a compensation.

# saga_events.py - event schema for the PlaceOrder saga

# Forward events (happy path)
SAGA_EVENTS = {
"order.placed": "Inventory Service consumes → publishes inventory.reserved",
"inventory.reserved": "Payment Service consumes → publishes payment.charged",
"payment.charged": "Shipping Service consumes → publishes shipment.created",
"shipment.created": "Order Service consumes → updates order to CONFIRMED",
}

# Compensation events (failure path)
SAGA_COMPENSATIONS = {
"payment.failed": "Inventory Service consumes → unreserves items",
"shipping.failed": "Payment Service consumes → issues refund",
"inventory.unavailable": "Order Service consumes → cancels order",
}

# order_service/handlers/order_handlers.py
class OrderSagaHandler:
"""Handles saga events for the Order Service."""

async def handle_order_placed(self, event: dict) -> None:
"""Step 1: Create order and publish order.placed."""
order_id = str(uuid.uuid4())
async with self._session_factory() as db:
order = Order(
id=order_id,
user_id=event["user_id"],
items=event["items"],
total=event["total"],
status="pending",
)
db.add(order)
await db.commit()

await self._producer.publish("order.placed", {
"order_id": order_id,
"user_id": event["user_id"],
"items": event["items"],
"total": event["total"],
"saga_id": str(uuid.uuid4()), # Correlate all saga events
})

async def handle_shipment_created(self, event: dict) -> None:
"""Final step: Order is confirmed."""
async with self._session_factory() as db:
await db.execute(
update(Order)
.where(Order.id == event["order_id"])
.values(status="confirmed", shipment_id=event["shipment_id"])
)
await db.commit()

async def handle_inventory_unavailable(self, event: dict) -> None:
"""Compensation: Inventory not available → cancel order."""
async with self._session_factory() as db:
await db.execute(
update(Order)
.where(Order.id == event["order_id"])
.values(status="cancelled", cancellation_reason="inventory_unavailable")
)
await db.commit()

# Notify user
await self._producer.publish("notification.send", {
"user_id": event["user_id"],
"message": "Your order was cancelled - items are out of stock",
})

# inventory_service/handlers/inventory_handlers.py
class InventoryHandler:
async def handle_order_placed(self, event: dict) -> None:
"""Reserve items for the order."""
order_id = event["order_id"]
items = event["items"]

async with self._session_factory() as db:
for item in items:
product = await db.get(Product, item["product_id"])
if not product or product.stock < item["quantity"]:
# Cannot fulfil - publish compensation event
await self._producer.publish("inventory.unavailable", {
"order_id": order_id,
"user_id": event["user_id"],
"saga_id": event["saga_id"],
"reason": f"Product {item['product_id']} has insufficient stock",
})
return

# Reserve all items atomically (within this service's DB)
for item in items:
product = await db.get(Product, item["product_id"])
product.stock -= item["quantity"]
db.add(Reservation(
order_id=order_id,
product_id=item["product_id"],
quantity=item["quantity"],
))
await db.commit()

await self._producer.publish("inventory.reserved", {
"order_id": order_id,
"user_id": event["user_id"],
"saga_id": event["saga_id"],
"items": items,
})

async def handle_payment_failed(self, event: dict) -> None:
"""Compensation: Payment failed → unreserve items."""
order_id = event["order_id"]
async with self._session_factory() as db:
reservations = await db.execute(
select(Reservation).where(Reservation.order_id == order_id)
)
for reservation in reservations.scalars():
product = await db.get(Product, reservation.product_id)
product.stock += reservation.quantity
await db.delete(reservation)
await db.commit()

logger.info(f"Compensation complete: unreserved items for order {order_id}")

# payment_service/handlers/payment_handlers.py
class PaymentHandler:
async def handle_inventory_reserved(self, event: dict) -> None:
"""Charge the payment method."""
order_id = event["order_id"]
try:
charge_id = await self._stripe.charge(
user_id=event["user_id"],
amount=event["total"],
idempotency_key=f"order-{order_id}", # Stripe idempotency
)
await self._producer.publish("payment.charged", {
"order_id": order_id,
"user_id": event["user_id"],
"saga_id": event["saga_id"],
"charge_id": charge_id,
})
except PaymentDeclinedError as exc:
await self._producer.publish("payment.failed", {
"order_id": order_id,
"user_id": event["user_id"],
"saga_id": event["saga_id"],
"reason": str(exc),
})

Tracking Saga State (for debugging and monitoring)

# saga_tracker.py - stores saga state for observability
# This is NOT required for the saga to work, but invaluable for debugging

class SagaTracker:
async def record_event(self, saga_id: str, event_type: str, event: dict) -> None:
async with self._session_factory() as db:
db.add(SagaEvent(
saga_id=saga_id,
event_type=event_type,
event_data=event,
recorded_at=datetime.utcnow(),
))
await db.commit()

async def get_saga_timeline(self, saga_id: str) -> list[dict]:
"""Return all events for a saga in order - for debugging."""
async with self._session_factory() as db:
result = await db.execute(
select(SagaEvent)
.where(SagaEvent.saga_id == saga_id)
.order_by(SagaEvent.recorded_at)
)
return [
{"event_type": e.event_type, "data": e.event_data, "at": e.recorded_at.isoformat()}
for e in result.scalars()
]

Interview Patterns

Q: What is the difference between at-least-once and exactly-once delivery in Kafka, and how do you achieve idempotency?

A: At-least-once means a message may be delivered multiple times if the consumer crashes after processing but before committing the offset. Exactly-once requires producer idempotency (Kafka's enable.idempotence=true) plus transactional consumers, which adds significant complexity and latency overhead. The practical approach is at-least-once delivery with idempotent consumers - each consumer records a processed event ID in a database before committing the offset, and skips messages it has already processed.

Q: How does Event Sourcing differ from storing the current state?

A: Traditional persistence stores only the latest state - an UPDATE overwrites previous data. Event sourcing appends immutable events and derives state by replaying them. Benefits: full audit history, ability to rebuild state at any point in time, ability to add new projections retroactively by replaying historical events. Costs: eventual consistency in read models, event schema evolution challenges, potentially slow aggregate load for long-lived entities (mitigated by snapshots).

Q: In a choreography-based saga, a payment fails after inventory was reserved. How do you ensure inventory is unreserved?

A: The Payment Service publishes a payment.failed event. The Inventory Service subscribes to payment.failed and runs a compensating transaction (unreserve items). For this to be reliable, the payment.failed event must be durably published to Kafka before the payment failure is reported to the user. The Inventory Service must be idempotent for the compensation - if payment.failed is delivered twice, unreserving twice should be a no-op (check if reservation still exists before unreserving).

Q: What is the difference between choreography and orchestration in the Saga pattern?

A: In choreography, each service knows which events to publish and react to - no central coordinator. It is simpler but harder to trace; you must read all service codebases to understand the saga flow. In orchestration, a dedicated Saga Orchestrator sends commands to each service and tracks overall state. The flow is visible in one place but adds a central point of failure and coupling. Choreography is preferred for simpler sagas (3–4 steps); orchestration for complex workflows where observability and rollback logic are critical.

Q: Why is the CQRS pattern often used with Event Sourcing?

A: Event Sourcing naturally produces a stream of domain events. CQRS provides a clean use for those events - building optimised, denormalised read models (projections). Without CQRS, you would need to query the event store directly for every read, which requires replaying events or maintaining complex views. With CQRS, the write model (event store) stays clean and the read model (relational tables, Elasticsearch) is updated by consuming events, giving fast reads without compromising write model integrity.

© 2026 EngineersOfAI. All rights reserved.